1   /**
2    * Copyright 2014 Netflix, Inc.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    * http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package rx.internal.operators;
17  
18  import static org.junit.Assert.*;
19  
20  import java.util.Arrays;
21  import java.util.concurrent.*;
22  import java.util.concurrent.atomic.*;
23  
24  import org.junit.Test;
25  
26  import rx.*;
27  import rx.Observable.OnSubscribe;
28  import rx.functions.*;
29  import rx.internal.util.RxRingBuffer;
30  import rx.observables.ConnectableObservable;
31  import rx.observers.TestSubscriber;
32  import rx.schedulers.Schedulers;
33  import rx.schedulers.TestScheduler;
34  
35  public class OperatorPublishTest {
36  
37      @Test
38      public void testPublish() throws InterruptedException {
39          final AtomicInteger counter = new AtomicInteger();
40          ConnectableObservable<String> o = Observable.create(new OnSubscribe<String>() {
41  
42              @Override
43              public void call(final Subscriber<? super String> observer) {
44                  new Thread(new Runnable() {
45  
46                      @Override
47                      public void run() {
48                          counter.incrementAndGet();
49                          observer.onNext("one");
50                          observer.onCompleted();
51                      }
52                  }).start();
53              }
54          }).publish();
55  
56          final CountDownLatch latch = new CountDownLatch(2);
57  
58          // subscribe once
59          o.subscribe(new Action1<String>() {
60  
61              @Override
62              public void call(String v) {
63                  assertEquals("one", v);
64                  latch.countDown();
65              }
66          });
67  
68          // subscribe again
69          o.subscribe(new Action1<String>() {
70  
71              @Override
72              public void call(String v) {
73                  assertEquals("one", v);
74                  latch.countDown();
75              }
76          });
77  
78          Subscription s = o.connect();
79          try {
80              if (!latch.await(1000, TimeUnit.MILLISECONDS)) {
81                  fail("subscriptions did not receive values");
82              }
83              assertEquals(1, counter.get());
84          } finally {
85              s.unsubscribe();
86          }
87      }
88  
89      @Test
90      public void testBackpressureFastSlow() {
91          ConnectableObservable<Integer> is = Observable.range(1, RxRingBuffer.SIZE * 2).publish();
92          Observable<Integer> fast = is.observeOn(Schedulers.computation()).doOnCompleted(new Action0() {
93  
94              @Override
95              public void call() {
96                  System.out.println("^^^^^^^^^^^^^ completed FAST");
97              }
98  
99          });
100         Observable<Integer> slow = is.observeOn(Schedulers.computation()).map(new Func1<Integer, Integer>() {
101             int c = 0;
102 
103             @Override
104             public Integer call(Integer i) {
105                 if (c == 0) {
106                     try {
107                         Thread.sleep(500);
108                     } catch (InterruptedException e) {
109                     }
110                 }
111                 c++;
112                 return i;
113             }
114 
115         }).doOnCompleted(new Action0() {
116 
117             @Override
118             public void call() {
119                 System.out.println("^^^^^^^^^^^^^ completed SLOW");
120             }
121 
122         });
123 
124         TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
125         Observable.merge(fast, slow).subscribe(ts);
126         is.connect();
127         ts.awaitTerminalEvent();
128         ts.assertNoErrors();
129         assertEquals(RxRingBuffer.SIZE * 4, ts.getOnNextEvents().size());
130     }
131 
132     // use case from https://github.com/ReactiveX/RxJava/issues/1732
133     @Test
134     public void testTakeUntilWithPublishedStreamUsingSelector() {
135         final AtomicInteger emitted = new AtomicInteger();
136         Observable<Integer> xs = Observable.range(0, RxRingBuffer.SIZE * 2).doOnNext(new Action1<Integer>() {
137 
138             @Override
139             public void call(Integer t1) {
140                 emitted.incrementAndGet();
141             }
142 
143         });
144         TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
145         xs.publish(new Func1<Observable<Integer>, Observable<Integer>>() {
146 
147             @Override
148             public Observable<Integer> call(Observable<Integer> xs) {
149                 return xs.takeUntil(xs.skipWhile(new Func1<Integer, Boolean>() {
150 
151                     @Override
152                     public Boolean call(Integer i) {
153                         return i <= 3;
154                     }
155 
156                 }));
157             }
158 
159         }).subscribe(ts);
160         ts.awaitTerminalEvent();
161         ts.assertNoErrors();
162         ts.assertReceivedOnNext(Arrays.asList(0, 1, 2, 3));
163         assertEquals(5, emitted.get());
164         System.out.println(ts.getOnNextEvents());
165     }
166 
167     // use case from https://github.com/ReactiveX/RxJava/issues/1732
168     @Test
169     public void testTakeUntilWithPublishedStream() {
170         Observable<Integer> xs = Observable.range(0, RxRingBuffer.SIZE * 2);
171         TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
172         ConnectableObservable<Integer> xsp = xs.publish();
173         xsp.takeUntil(xsp.skipWhile(new Func1<Integer, Boolean>() {
174 
175             @Override
176             public Boolean call(Integer i) {
177                 return i <= 3;
178             }
179 
180         })).subscribe(ts);
181         xsp.connect();
182         System.out.println(ts.getOnNextEvents());
183     }
184 
185     @Test(timeout = 10000)
186     public void testBackpressureTwoConsumers() {
187         final AtomicInteger sourceEmission = new AtomicInteger();
188         final AtomicBoolean sourceUnsubscribed = new AtomicBoolean();
189         final Observable<Integer> source = Observable.range(1, 100)
190                 .doOnNext(new Action1<Integer>() {
191                     @Override
192                     public void call(Integer t1) {
193                         sourceEmission.incrementAndGet();
194                     }
195                 })
196                 .doOnUnsubscribe(new Action0() {
197                     @Override
198                     public void call() {
199                         sourceUnsubscribed.set(true);
200                     }
201                 }).share();
202         ;
203         
204         final AtomicBoolean child1Unsubscribed = new AtomicBoolean();
205         final AtomicBoolean child2Unsubscribed = new AtomicBoolean();
206 
207         final TestSubscriber<Integer> ts2 = new TestSubscriber<Integer>();
208 
209         final TestSubscriber<Integer> ts1 = new TestSubscriber<Integer>() {
210             @Override
211             public void onNext(Integer t) {
212                 if (getOnNextEvents().size() == 2) {
213                     source.doOnUnsubscribe(new Action0() {
214                         @Override
215                         public void call() {
216                             child2Unsubscribed.set(true);
217                         }
218                     }).take(5).subscribe(ts2);
219                 }
220                 super.onNext(t);
221             }
222         };
223         
224         source.doOnUnsubscribe(new Action0() {
225             @Override
226             public void call() {
227                 child1Unsubscribed.set(true);
228             }
229         }).take(5).subscribe(ts1);
230         
231         ts1.awaitTerminalEvent();
232         ts2.awaitTerminalEvent();
233         
234         ts1.assertNoErrors();
235         ts2.assertNoErrors();
236         
237         assertTrue(sourceUnsubscribed.get());
238         assertTrue(child1Unsubscribed.get());
239         assertTrue(child2Unsubscribed.get());
240         
241         ts1.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5));
242         ts2.assertReceivedOnNext(Arrays.asList(4, 5, 6, 7, 8));
243         
244         assertEquals(8, sourceEmission.get());
245     }
246 
247     @Test
248     public void testConnectWithNoSubscriber() {
249         TestScheduler scheduler = new TestScheduler();
250         ConnectableObservable<Long> co = Observable.timer(10, 10, TimeUnit.MILLISECONDS, scheduler).take(3).publish();
251         co.connect();
252         // Emit 0
253         scheduler.advanceTimeBy(15, TimeUnit.MILLISECONDS);
254         TestSubscriber<Long> subscriber = new TestSubscriber<Long>();
255         co.subscribe(subscriber);
256         // Emit 1 and 2
257         scheduler.advanceTimeBy(50, TimeUnit.MILLISECONDS);
258         subscriber.assertReceivedOnNext(Arrays.asList(1L, 2L));
259         subscriber.assertNoErrors();
260         subscriber.assertTerminalEvent();
261     }
262 }